package hadoop.mapjoin;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;

public class DistributedCacheMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
    HashMap<String,String> pdMap = new HashMap<>();
    Text k=new Text();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //获取缓存的小表
        URI[] cacheFiles=context.getCacheFiles();
        String path=cacheFiles[0].getPath().toString();
        BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path),"UTF-8"));

        String line;
        while(StringUtils.isNotEmpty(line=reader.readLine())){
            //切割
            String[]fields=line.split("\t");
            pdMap.put(fields[0],fields[1]);
        }
        //关闭资源
        IOUtils.closeStream(reader);
    }


    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line=value.toString();
        String[] fields = line.split("\t");
        //获取pid
        String pid=fields[1];
        //获取pid对应的pname
        String pname=pdMap.get(pid);
        line=line+"\t"+pname;
        k.set(line);
        context.write(k,NullWritable.get());
    }
}
